use super::ffi::{
    size_t, MQTTClient, MQTTClientInit, MQTTConnect, MQTTDisconnect, MQTTLenString, MQTTMessage,
    MQTTPacket_connectData, MQTTPacket_willOptions, MQTTPublish, MQTTString, MQTTSubscribe,
    MQTTUnsubscribe, MQTTYield, MessageData, Network, NetworkConnect, NetworkDisconnect,
    NetworkInit, QoS,
};
use super::{MqttClient, MqttConnectOption, MqttHandler, MqttMessage};
use alloc::boxed::Box;
use alloc::string::String;
use alloc::string::ToString;
use alloc::vec::Vec;
use core::ffi::{c_char, c_int, c_ushort, c_void};
use core::ptr::null_mut;
use core::slice::from_raw_parts;
use embed_std::ffi::CString;
use embed_std::try_io_err;
use embed_std::util::{cstr_to_str, cstr_with_len_to_str, str_to_cstring};
use embed_std::{debugln, infoln};
use embed_std::{Error, Result};

const SEND_BUFFER_SIZE: usize = 2048;
const RECV_BUFFER_SIZE: usize = 2048;

pub struct PahoMqttClient {
    network: Network,
    client: MQTTClient,
    send_buffer: [u8; SEND_BUFFER_SIZE],
    recv_buffer: [u8; RECV_BUFFER_SIZE],
    handler: Option<Box<dyn MqttHandler + Send>>,
    topics: Vec<CString>,
}

unsafe impl Send for PahoMqttClient {}

impl PahoMqttClient {
    pub fn new() -> Box<PahoMqttClient> {
        unsafe {
            Box::new(PahoMqttClient {
                network: core::mem::zeroed(),
                client: core::mem::zeroed(),
                send_buffer: [0u8; SEND_BUFFER_SIZE],
                recv_buffer: [0u8; RECV_BUFFER_SIZE],
                handler: None,
                topics: Vec::new(),
            })
        }
    }

    pub fn handle_msg(&mut self, topic: &str, msg: &MqttMessage) {
        debugln!(
            "handle msg, topic {}, handler {} ",
            topic,
            self.handler.is_some()
        );
        if let Some(ref handler) = self.handler {
            debugln!("handler {:p}", handler);
            handler.handle(topic, msg);
        }
    }
}

impl MqttClient for PahoMqttClient {
    fn init(&mut self, handler: Option<Box<dyn MqttHandler + Send>>) -> Result<()> {
        self.handler = handler;
        Ok(())
    }

    fn connect(&mut self, server: &str, port: u16, opts: &MqttConnectOption) -> Result<()> {
        self.topics.clear();
        unsafe {
            self.network.my_socket = -1;
            NetworkInit(&mut self.network);
            infoln!("paho network connect to {}:{}...", server, port);
            let addr = str_to_cstring(server);
            try_io_err!(
                NetworkConnect(
                    &mut self.network,
                    addr.as_ptr() as *mut core::ffi::c_char,
                    port as c_int,
                ),
                "connect to mqtt server failed".to_string()
            );
            debugln!(
                "connected fd {} read {:p} write {:p}",
                self.network.my_socket,
                self.network.mqttread.as_ref().unwrap(),
                self.network.mqttwrite.as_ref().unwrap()
            );
            debugln!("paho mqtt client init");
            MQTTClientInit(
                &mut self.client,
                &mut self.network,
                opts.command_timeout,
                self.send_buffer.as_mut_ptr(),
                self.send_buffer.len() as size_t,
                self.recv_buffer.as_mut_ptr(),
                self.recv_buffer.len() as size_t,
            );

            let client_id = str_to_cstring(opts.client_id);
            let user = str_to_cstring(opts.user);
            let pwd = str_to_cstring(opts.passwd);
            let mut data: MQTTPacket_connectData = MQTTPacket_connectData {
                struct_id: [
                    b'M' as c_char,
                    b'Q' as c_char,
                    b'T' as c_char,
                    b'C' as c_char,
                ],
                struct_version: 0,
                MQTTVersion: opts.ver,
                clientID: MQTTString {
                    cstring: client_id.as_ptr() as *mut c_char,
                    lenstring: MQTTLenString {
                        len: 0,
                        data: null_mut(),
                    },
                },
                keepAliveInterval: opts.keep_alive_interval as c_ushort,
                cleansession: 1,
                willFlag: 0,
                will: MQTTPacket_willOptions {
                    struct_id: [
                        b'M' as c_char,
                        b'Q' as c_char,
                        b'T' as c_char,
                        b'W' as c_char,
                    ],
                    struct_version: 0,
                    topicName: MQTTString {
                        cstring: null_mut(),
                        lenstring: MQTTLenString {
                            len: 0,
                            data: null_mut(),
                        },
                    },
                    message: MQTTString {
                        cstring: null_mut(),
                        lenstring: MQTTLenString {
                            len: 0,
                            data: null_mut(),
                        },
                    },
                    retained: 0,
                    qos: 0,
                },
                username: MQTTString {
                    cstring: user.as_ptr() as *mut c_char,
                    lenstring: MQTTLenString {
                        len: 0,
                        data: null_mut(),
                    },
                },
                password: MQTTString {
                    cstring: pwd.as_ptr() as *mut c_char,
                    lenstring: MQTTLenString {
                        len: 0,
                        data: null_mut(),
                    },
                },
            };
            if let Some(ref lwt) = opts.lwt {
                data.willFlag = 1;
                data.will.topicName = MQTTString {
                    cstring: null_mut(),
                    lenstring: MQTTLenString {
                        len: lwt.topic.len() as c_int,
                        data: lwt.topic.as_ptr() as *mut c_char,
                    },
                };
                data.will.message = MQTTString {
                    cstring: null_mut(),
                    lenstring: MQTTLenString {
                        len: lwt.payload.len() as c_int,
                        data: lwt.payload.as_ptr() as *mut c_char,
                    },
                };
                data.will.qos = lwt.qos as c_char;
                data.will.retained = if lwt.retain { 1 } else { 0 };
            }

            debugln!(
                "opts(rust): client_id {} user {} pwd {}",
                opts.client_id,
                opts.user,
                opts.passwd
            );

            debugln!(
                "opts: client_id {} user {} pwd {}",
                cstr_to_str(data.clientID.cstring),
                cstr_to_str(data.username.cstring),
                cstr_to_str(data.password.cstring),
            );

            infoln!(
                "paho mqtt isconnected {}, send_buffer {}-{:p} recv_buffer {}-{:p} ipstack {:p}, connect...",
                self.client.isconnected,
                self.client.buf_size, self.client.buf,
                self.client.readbuf_size, self.client.readbuf,
                self.client.ipstack,
            );
            try_io_err!(
                MQTTConnect(&mut self.client, &mut data),
                String::from("connect mqtt server failed")
            );
            Ok(())
        }
    }

    fn disconnect(&mut self) -> Result<()> {
        unsafe {
            NetworkDisconnect(&mut self.network);
            try_io_err!(
                MQTTDisconnect(&mut self.client),
                String::from("disconnect mqtt failed")
            );
        }
        Ok(())
    }

    fn subscribe(&mut self, topic: &str, qos: u8) -> Result<()> {
        unsafe {
            infoln!("subscribe {}", topic);
            //防止top被析构，因为MQTTSubscribe仅保留了指针
            let top = str_to_cstring(topic);
            for i in 0..self.topics.len() {
                if self.topics[i] == top {
                    return Err(Error::Other(String::from("dup")));
                }
            }
            try_io_err!(
                MQTTSubscribe(
                    &mut self.client,
                    top.as_ptr() as *const c_char,
                    qos as QoS,
                    Some(handle_message),
                    self as *mut PahoMqttClient as *mut c_void,
                ),
                String::from("subscribe failed!")
            );
            self.topics.push(top);
            Ok(())
        }
    }

    fn unsubscribe(&mut self, topic: &str) -> Result<()> {
        unsafe {
            infoln!("unsubscribe {}", topic);
            let top = str_to_cstring(topic);
            try_io_err!(
                MQTTUnsubscribe(&mut self.client, top.as_ptr() as *const c_char),
                String::from("subscribe failed!")
            );
            for i in 0..self.topics.len() {
                if self.topics[i] == top {
                    self.topics.remove(i);
                    break;
                }
            }
            Ok(())
        }
    }

    fn publish(&mut self, topic: &str, payload: &[u8], qos: u8, retain: bool) -> Result<()> {
        unsafe {
            let mut msg: MQTTMessage = MQTTMessage {
                qos: qos as QoS,
                retained: if retain { 1 } else { 0 },
                dup: 0,
                id: 0,
                payload: payload.as_ptr() as *mut c_void,
                payloadlen: payload.len() as size_t,
            };
            let top = str_to_cstring(topic);
            try_io_err!(
                MQTTPublish(&mut self.client, top.as_ptr() as *const c_char, &mut msg),
                String::from("publish failed")
            );
            Ok(())
        }
    }

    fn is_connected(&self) -> bool {
        if self.client.isconnected == 0 {
            false
        } else {
            true
        }
    }

    fn pull_data(&mut self, timeout: u32) -> Result<()> {
        try_io_err!(
            unsafe { MQTTYield(&mut self.client, timeout as c_int) },
            String::from("pull data failed")
        );
        Ok(())
    }
}

impl Drop for PahoMqttClient {
    fn drop(&mut self) {
        infoln!("drop pahomqttclient!");
        let _ = self.disconnect();
    }
}

unsafe extern "C" fn handle_message(arg: *mut MessageData, user_data: *mut c_void) {
    debugln!("handle message(c) {:p} user_data {:p}", arg, user_data);
    if !user_data.is_null() {
        let my = &mut *(user_data as *mut PahoMqttClient);
        let msg = &(*arg);
        let top = from_mqttstring_to_str(msg.topicName);
        debugln!("topic {}", top);
        if msg.message.is_null() {
            return;
        }
        let m = &(*msg.message);
        debugln!("convert message");
        let m = MqttMessage {
            qos: m.qos as u8,
            retained: m.retained,
            dup: m.dup,
            msg_id: m.id,
            payload: from_raw_parts(m.payload as *const u8, m.payloadlen as usize),
        };
        debugln!("call client::handle_msg");
        my.handle_msg(top, &m);
    }
}

pub fn from_mqttstring_to_str<'a>(s: *const MQTTString) -> &'a str {
    unsafe {
        if !s.is_null() {
            let s1 = &(*s);
            if !s1.cstring.is_null() {
                return cstr_to_str(s1.cstring);
            }
            if s1.lenstring.len > 0 && !s1.lenstring.data.is_null() {
                return cstr_with_len_to_str(s1.lenstring.data, s1.lenstring.len as usize);
            }
        }
        return "";
    }
}
